mydump: read row group data at once for small parquet files (#66071)#66574
mydump: read row group data at once for small parquet files (#66071)#66574ti-chi-bot wants to merge 1 commit intopingcap:release-8.5from
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
@joechenrh This PR has conflicts, I have hold it. |
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThe PR refactors parquet parsing in TiDB Lightning to use a rowGroup-based architecture with per-column iterators and in-memory prefetching strategies. It enhances the storage layer for bounded reads with proper offset tracking and updates build dependencies accordingly. Multiple unresolved merge conflicts require resolution. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant ParquetParser
participant rowGroupParser
participant ColumnIterator
participant ParquetWrapper
participant Storage
Client->>ParquetParser: NewParquetParser(ctx, store, reader, path, meta)
ParquetParser->>rowGroupParser: init() with metadata
rowGroupParser->>ColumnIterator: createColumnIterator per column
ColumnIterator->>ParquetWrapper: getBuilder() select in-memory or streaming
alt Small Row Group (in-memory)
ParquetWrapper->>Storage: ReadDataInRange(concurrent batch reads)
Storage-->>ParquetWrapper: row group bytes
ParquetWrapper->>ColumnIterator: inMemoryParquetWrapper with prefetched data
else Large Row Group (streaming)
ParquetWrapper->>ColumnIterator: parquetWrapper with skip-buffer optimization
end
ColumnIterator-->>rowGroupParser: iterator ready
Client->>rowGroupParser: readRow()
rowGroupParser->>ColumnIterator: read from each column iterator
ColumnIterator-->>rowGroupParser: column values
rowGroupParser-->>Client: aggregated row
Client->>ParquetParser: moveToNextRowGroup()
ParquetParser->>rowGroupParser: rebuild iterators for next group
Client->>ParquetParser: Close()
ParquetParser->>rowGroupParser: Close() with error aggregation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.5.0)Error: can't load config: unsupported version of the configuration: "" See https://golangci-lint.run/docs/product/migration-guide for migration instructions Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@ti-chi-bot: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
@ti-chi-bot: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
br/pkg/storage/BUILD.bazel (1)
89-130:⚠️ Potential issue | 🟠 MajorRegenerate Bazel metadata for this BUILD change.
This BUILD file was edited, so generated Bazel metadata should be refreshed and committed in this PR.
As per coding guidelines
**/BUILD.bazel: MUST runmake bazel_prepareand include resulting Bazel metadata changes in the PR when Bazel files are changed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@br/pkg/storage/BUILD.bazel` around lines 89 - 130, The BUILD change in br/pkg/storage/BUILD.bazel (the storage_test rule where shard_count was modified) requires regenerated Bazel metadata; run make bazel_prepare from the repo root to refresh generated Bazel files, verify changes (including any touched metadata for the storage package), and add/commit the resulting generated files to this PR so the BUILD.bazel edit is accompanied by updated Bazel metadata.pkg/lightning/mydump/BUILD.bazel (1)
3-125:⚠️ Potential issue | 🟠 MajorRun
make bazel_prepareafter this BUILD update.This file changed Bazel targets/deps, so generated Bazel metadata should be updated in the same PR.
As per coding guidelines
**/BUILD.bazel: MUST runmake bazel_prepareand include resulting Bazel metadata changes in the PR when Bazel files are changed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/lightning/mydump/BUILD.bazel` around lines 3 - 125, The BUILD change added/modified dependencies for the go_library target named "mydump" and the go_test target "mydump_test" but the generated Bazel metadata is missing; run "make bazel_prepare" locally to regenerate Bazel metadata and commit the produced changes (the updated generated files) into this PR so the BUILD changes and generated metadata stay in sync.pkg/lightning/mydump/parquet_parser.go (4)
294-322:⚠️ Potential issue | 🔴 CriticalUnresolved merge conflict in method implementations.
This section contains conflicting code between the
parquetFileWrappermethods (HEAD) and therowGroupParser.Close()method (cherry-pick). The newClose()method usingcommon.OnceErroris part of the intended refactor.Resolve by keeping the
rowGroupParser.Close()implementation and removing the obsoleteparquetFileWrappermethods that belong to the old architecture.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/lightning/mydump/parquet_parser.go` around lines 294 - 322, Remove the leftover conflicting parquetFileWrapper methods (Seek, Write, Open) from this hunk and keep the new rowGroupParser.Close implementation that uses common.OnceError; specifically delete the parquetFileWrapper.Seek, parquetFileWrapper.Write and parquetFileWrapper.Open blocks and ensure the retained function is the rowGroupParser.Close that iterates rgp.iterators and rgp.readers and returns onceErr.Get(), preserving the common.OnceError usage.
598-621:⚠️ Potential issue | 🔴 CriticalUnresolved merge conflict in
OpenParquetReaderfunction.The HEAD branch contains
OpenParquetReaderwhich the cherry-picked commit removes. Based on the AI summary, the new architecture eliminates the Parquet-specificOpenParquetReaderpath.Resolve by removing the
OpenParquetReaderfunction if it's no longer needed in the new row-group-based architecture.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/lightning/mydump/parquet_parser.go` around lines 598 - 621, Remove the leftover OpenParquetReader function and its associated references (the symbol OpenParquetReader and the parquetFileWrapper initialization in this diff) because the new row-group-based architecture no longer uses the Parquet-specific path; ensure callers are updated to use the new reader creation path (or the generic store.Open return) and delete the OpenParquetReader declaration and any unused parquetFileWrapper-only fields to avoid the unresolved merge conflict.
652-670:⚠️ Potential issue | 🔴 CriticalUnresolved merge conflict in
NewParquetParserinitialization.The conflict is between different wrapper initialization approaches. The cherry-picked version uses a simpler
parquetWrapperwith a deferred close, while HEAD uses the more complexparquetFileWrapper.Resolve according to the new architecture's requirements.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/lightning/mydump/parquet_parser.go` around lines 652 - 670, There is a leftover merge conflict in NewParquetParser: decide and implement the correct wrapper type (parquetWrapper vs parquetFileWrapper), remove the conflict markers, and make the initialization match the rest of the function: if the new architecture expects parquetWrapper, replace the HEAD block with logger := log.Wrap(logutil.Logger(ctx)); wrapper := &parquetWrapper{ReadSeekCloser: r} and add defer r.Close(); if the code expects parquetFileWrapper, keep the HEAD initialization (including store, ctx, path, skipBuf) and use logger := logutil.Logger(ctx) without the deferred close; ensure the chosen wrapper variable name and type (parquetWrapper or parquetFileWrapper) match all subsequent uses in NewParquetParser and remove the git conflict markers.
710-732:⚠️ Potential issue | 🔴 CriticalUnresolved merge conflict in reader initialization loop.
HEAD contains a loop to create sub-readers for each column using the old wrapper approach. The cherry-pick removes this in favor of the new
buildRowGroupParserapproach that creates readers on-demand.Resolve by removing this loop since the new architecture handles per-column reader creation in
buildRowGroupParser().🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/lightning/mydump/parquet_parser.go` around lines 710 - 732, Remove the obsolete per-column reader initialization loop that creates subreaders (the block referencing subreaders := make(...), the for i := 1; i < fileSchema.NumColumns(); i++ loop, wrapper.Open(), and repeated file.NewParquetReader calls); the new design constructs per-column readers on demand via buildRowGroupParser(), so delete that entire loop and any variables only used by it (e.g., subreaders) and ensure the initial reader remains if needed by buildRowGroupParser().
🧹 Nitpick comments (1)
pkg/lightning/mydump/parquet_parser.go (1)
449-456: Consider adding context cancellation check in the fallback builder.The in-memory path (lines 440-446) uses the context from
getBuilder, but the fallbacknewParquetWrapperpath (lines 449-455) should also handle context cancellation gracefully.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/lightning/mydump/parquet_parser.go` around lines 449 - 456, The fallback builder returned by getBuilder should check for context cancellation before calling newParquetWrapper; modify the anonymous function that returns (readerAtSeekerCloser, error) to first inspect ctx (e.g., select on ctx.Done() or check ctx.Err()) and return an appropriate error if cancelled, then proceed to call newParquetWrapper(pp.store, pp.path, &storeapi.ReaderOption{StartOffset: &ranges.columnStarts[c], EndOffset: &ranges.columnEnds[c]}) when the context is still valid; keep using the same ctx variable, and ensure any returned error propagates the context cancellation (ctx.Err()) rather than proceeding with the wrapper creation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@br/pkg/storage/BUILD.bazel`:
- Around line 107-111: The BUILD file contains unresolved Git conflict markers
(<<<<<<<, =======, >>>>>>>) around the shard_count setting which breaks Bazel;
remove the conflict markers and leave a single valid shard_count assignment
(choose the intended value—either 45 or 50—or confirm with the author), e.g.,
replace the whole conflicted block with a single line "shard_count =
<chosen_value>," and ensure there are no leftover markers or duplicate entries
for shard_count.
In `@br/pkg/storage/memstore_test.go`:
- Around line 121-124: The test is using store.Open with a wrong type reference:
replace the undefined storeapi.ReaderOption with the local storage package type
by passing &ReaderOption{ StartOffset: &start, EndOffset: &end } (i.e., use
ReaderOption from the same package as Open); update the call at r, err :=
store.Open(ctx, fileName, ...) to construct the correct *ReaderOption type so it
matches the Open signature.
In `@pkg/lightning/mydump/BUILD.bazel`:
- Around line 26-30: Remove the Git merge-conflict markers (<<<<<<<, =======,
>>>>>>>) and merge the dependency entries so the deps list contains the intended
entries (e.g. include both "//pkg/errno" and "//pkg/lightning/backend/external"
as normal strings) without any conflict markers; update the dependency block in
BUILD.bazel where the markers appear and apply the same fix to the other
occurrence noted (lines 58-62) so the file is valid for Bazel parsing.
In `@pkg/lightning/mydump/parquet_parser_test.go`:
- Around line 97-107: Remove all unresolved Git conflict markers in
parquet_parser_test.go and reconcile the two variants into a single consistent
implementation; specifically, replace the conflict block that contains manual
storage.NewLocalStorage/open/NewParquetParser calls with the test helper
newParquetParserForTest(context.Background(), t, dir, name, ParquetFileMeta{})
(and remove the corresponding defer/close duplication) so the test compiles, and
repeat this cleanup for the other listed ranges (156-172, 176-200, 204-240,
325-339, 346-449, 483-488, 518-524, 547-557, 567-573, 642-653), ensuring any
duplicate resource setup (storage.NewLocalStorage, store.Open, NewParquetParser)
is replaced by the helper and remaining code uses the helper’s returned reader
and its Close method.
- Around line 45-46: The test currently calls objstore.NewLocalStorage(dir)
which is undefined because the file imports storage as
"github.com/pingcap/tidb/br/pkg/storage"; replace the incorrect symbol with
storage.NewLocalStorage(dir) (or update the import alias to objstore if you
prefer) so the call matches the imported package; update any other occurrences
of objstore.* in parquet_parser_test.go to storage.* to fix the undefined
reference.
In `@pkg/lightning/mydump/parquet_parser.go`:
- Around line 227-238: The struct rowGroupParser has an unresolved merge
conflict: remove the HEAD fields (storage.ReadSeekCloser, lastOff, skipBuf,
store, path) and adopt the cherry-picked fields for the new design (readers
[]*file.Reader and iterators []iterator); update the rowGroupParser definition
to only include readers and iterators and ensure any uses of the removed fields
are refactored to the new per-column reader/iterator approach (search for
rowGroupParser, readers, iterators, storage.ReadSeekCloser, lastOff, skipBuf,
store, and path to update call sites).
- Around line 29-33: Resolve the merge conflict markers in parquet_parser.go
imports: remove the <<<<<<<, =======, and >>>>>>> lines and ensure both import
paths are present by adding both "github.com/pingcap/tidb/br/pkg/storage" and
"github.com/pingcap/tidb/pkg/lightning/common" in the import block; then run a
build to confirm there are no unused-import errors and adjust usages or add
aliasing if necessary to satisfy references in functions/methods within
parquet_parser.go (e.g., any calls referencing storage or common).
- Around line 372-426: In buildRowGroupParser, the goroutines started via eg.Go
capture the loop variable i and do not respect cancellation on egCtx; fix by
copying the loop index into a local var (e.g., idx := i) before calling eg.Go
and, at the top of each goroutine passed to eg.Go, check egCtx.Done() (return
early with ctx.Err() if canceled) to mirror the cancellation pattern used in
parquet_wrapper.go and schema_import.go; also add a short comment near
eg.SetLimit(8) explaining/justifying why the concurrency limit is 8 (or replace
with a named constant) so the rationale is documented.
In `@pkg/lightning/mydump/parquet_wrapper.go`:
- Around line 27-29: The imports of non-existent packages
"github.com/pingcap/tidb/pkg/objstore" and
"github.com/pingcap/tidb/pkg/objstore/storeapi" in parquet_wrapper.go should be
replaced with "br/pkg/storage"; update any references that rely on
ExternalStorage, ReaderOption, and ReadDataInRange to use the equivalents from
br/pkg/storage (they have the same names), and adjust import blocks wherever
those two old imports appear (other occurrences around the file: the other
import groups referenced in the review). Specifically, replace the two imports
at the top and update any code calling ReadDataInRange, using ExternalStorage or
ReaderOption to reference br/pkg/storage so the file compiles.
In `@pkg/lightning/mydump/parquet_writer.go`:
- Around line 56-84: The function writeParquetColumnBatch uses unchecked type
assertions (e.g., buf, _ := vals.([]int64)) which can yield nil/zero buffers and
corrupt output; update each case (for types like *file.Int96ColumnChunkWriter,
*file.Int64ColumnChunkWriter, *file.Float64ColumnChunkWriter,
*file.ByteArrayColumnChunkWriter, *file.FixedLenByteArrayColumnChunkWriter,
*file.Int32ColumnChunkWriter, *file.BooleanColumnChunkWriter) to perform the
comma-ok assertion (buf, ok := vals.([]T)) and if ok is false return a clear
error (e.g., fmt.Errorf("expected []T for %T but got %T", bufType, cw, vals))
instead of calling WriteBatch with a nil buffer, otherwise call WriteBatch(buf,
defLevels, nil) and propagate its error.
---
Outside diff comments:
In `@br/pkg/storage/BUILD.bazel`:
- Around line 89-130: The BUILD change in br/pkg/storage/BUILD.bazel (the
storage_test rule where shard_count was modified) requires regenerated Bazel
metadata; run make bazel_prepare from the repo root to refresh generated Bazel
files, verify changes (including any touched metadata for the storage package),
and add/commit the resulting generated files to this PR so the BUILD.bazel edit
is accompanied by updated Bazel metadata.
In `@pkg/lightning/mydump/BUILD.bazel`:
- Around line 3-125: The BUILD change added/modified dependencies for the
go_library target named "mydump" and the go_test target "mydump_test" but the
generated Bazel metadata is missing; run "make bazel_prepare" locally to
regenerate Bazel metadata and commit the produced changes (the updated generated
files) into this PR so the BUILD changes and generated metadata stay in sync.
In `@pkg/lightning/mydump/parquet_parser.go`:
- Around line 294-322: Remove the leftover conflicting parquetFileWrapper
methods (Seek, Write, Open) from this hunk and keep the new rowGroupParser.Close
implementation that uses common.OnceError; specifically delete the
parquetFileWrapper.Seek, parquetFileWrapper.Write and parquetFileWrapper.Open
blocks and ensure the retained function is the rowGroupParser.Close that
iterates rgp.iterators and rgp.readers and returns onceErr.Get(), preserving the
common.OnceError usage.
- Around line 598-621: Remove the leftover OpenParquetReader function and its
associated references (the symbol OpenParquetReader and the parquetFileWrapper
initialization in this diff) because the new row-group-based architecture no
longer uses the Parquet-specific path; ensure callers are updated to use the new
reader creation path (or the generic store.Open return) and delete the
OpenParquetReader declaration and any unused parquetFileWrapper-only fields to
avoid the unresolved merge conflict.
- Around line 652-670: There is a leftover merge conflict in NewParquetParser:
decide and implement the correct wrapper type (parquetWrapper vs
parquetFileWrapper), remove the conflict markers, and make the initialization
match the rest of the function: if the new architecture expects parquetWrapper,
replace the HEAD block with logger := log.Wrap(logutil.Logger(ctx)); wrapper :=
&parquetWrapper{ReadSeekCloser: r} and add defer r.Close(); if the code expects
parquetFileWrapper, keep the HEAD initialization (including store, ctx, path,
skipBuf) and use logger := logutil.Logger(ctx) without the deferred close;
ensure the chosen wrapper variable name and type (parquetWrapper or
parquetFileWrapper) match all subsequent uses in NewParquetParser and remove the
git conflict markers.
- Around line 710-732: Remove the obsolete per-column reader initialization loop
that creates subreaders (the block referencing subreaders := make(...), the for
i := 1; i < fileSchema.NumColumns(); i++ loop, wrapper.Open(), and repeated
file.NewParquetReader calls); the new design constructs per-column readers on
demand via buildRowGroupParser(), so delete that entire loop and any variables
only used by it (e.g., subreaders) and ensure the initial reader remains if
needed by buildRowGroupParser().
---
Nitpick comments:
In `@pkg/lightning/mydump/parquet_parser.go`:
- Around line 449-456: The fallback builder returned by getBuilder should check
for context cancellation before calling newParquetWrapper; modify the anonymous
function that returns (readerAtSeekerCloser, error) to first inspect ctx (e.g.,
select on ctx.Done() or check ctx.Err()) and return an appropriate error if
cancelled, then proceed to call newParquetWrapper(pp.store, pp.path,
&storeapi.ReaderOption{StartOffset: &ranges.columnStarts[c], EndOffset:
&ranges.columnEnds[c]}) when the context is still valid; keep using the same ctx
variable, and ensure any returned error propagates the context cancellation
(ctx.Err()) rather than proceeding with the wrapper creation.
ℹ️ Review info
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
br/pkg/storage/BUILD.bazelbr/pkg/storage/memstore.gobr/pkg/storage/memstore_test.gobr/pkg/storage/storage.gopkg/lightning/mydump/BUILD.bazelpkg/lightning/mydump/parquet_parser.gopkg/lightning/mydump/parquet_parser_test.gopkg/lightning/mydump/parquet_wrapper.gopkg/lightning/mydump/parquet_writer.gopkg/lightning/mydump/parser.go
💤 Files with no reviewable changes (1)
- pkg/lightning/mydump/parser.go
| <<<<<<< HEAD:br/pkg/storage/BUILD.bazel | ||
| shard_count = 50, | ||
| ======= | ||
| shard_count = 45, | ||
| >>>>>>> 1d9393d0cd7 (mydump: read row group data at once for small parquet files (#66071)):pkg/objstore/BUILD.bazel |
There was a problem hiding this comment.
Resolve merge-conflict markers before merge.
The conflict markers around shard_count make this BUILD file invalid and will break Bazel parsing/CI.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@br/pkg/storage/BUILD.bazel` around lines 107 - 111, The BUILD file contains
unresolved Git conflict markers (<<<<<<<, =======, >>>>>>>) around the
shard_count setting which breaks Bazel; remove the conflict markers and leave a
single valid shard_count assignment (choose the intended value—either 45 or
50—or confirm with the author), e.g., replace the whole conflicted block with a
single line "shard_count = <chosen_value>," and ensure there are no leftover
markers or duplicate entries for shard_count.
| r, err := store.Open(ctx, fileName, &storeapi.ReaderOption{ | ||
| StartOffset: &start, | ||
| EndOffset: &end, | ||
| }) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's verify the file exists and check its size
cd /tmp && find . -name "memstore_test.go" -type f 2>/dev/null | head -5
wc -l br/pkg/storage/memstore_test.go 2>/dev/null || echo "File path needs adjustment"Repository: pingcap/tidb
Length of output: 82
🏁 Script executed:
# Search for ReaderOption definitions
rg -n 'type\s+ReaderOption\s+struct' --type=goRepository: pingcap/tidb
Length of output: 112
🏁 Script executed:
# Check for storeapi package references
rg -n 'storeapi' --type=go | head -20Repository: pingcap/tidb
Length of output: 764
🏁 Script executed:
# Read the actual memstore_test.go file around the mentioned lines
head -160 br/pkg/storage/memstore_test.go | tail -50Repository: pingcap/tidb
Length of output: 1154
🏁 Script executed:
# Check the beginning of memstore_test.go to see package declaration and imports
head -30 br/pkg/storage/memstore_test.goRepository: pingcap/tidb
Length of output: 843
🏁 Script executed:
# Search for import statements in memstore_test.go
rg -n '^import|^package' br/pkg/storage/memstore_test.go -A 15 | head -40Repository: pingcap/tidb
Length of output: 398
🏁 Script executed:
# Check what ReaderOption is in storage.go to understand the type
rg -A 10 'type ReaderOption struct' br/pkg/storage/storage.goRepository: pingcap/tidb
Length of output: 378
🏁 Script executed:
# Verify that storeapi is not imported in memstore_test.go
grep -n "import\|storeapi" br/pkg/storage/memstore_test.go | head -30Repository: pingcap/tidb
Length of output: 132
🏁 Script executed:
# Double-check if there's any storeapi import that we may have missed
rg -n 'storeapi' br/pkg/storage/memstore_test.goRepository: pingcap/tidb
Length of output: 120
🏁 Script executed:
# Verify the complete import section of memstore_test.go
sed -n '14,26p' br/pkg/storage/memstore_test.goRepository: pingcap/tidb
Length of output: 205
🏁 Script executed:
# Check what the Open method signature expects for the third parameter
rg -B 5 -A 10 'func.*Open.*ReaderOption' br/pkg/storage/storage.go | head -30Repository: pingcap/tidb
Length of output: 38
🏁 Script executed:
# Search for the Open method definition in memstore.go or storage.go
rg -n 'func.*Open' br/pkg/storage/storage.go -A 3 | head -20Repository: pingcap/tidb
Length of output: 38
🏁 Script executed:
# Check memstore.go for the Open implementation
rg -n 'func.*Open' br/pkg/storage/memstore.go -A 3 | head -20Repository: pingcap/tidb
Length of output: 252
🏁 Script executed:
# Final verification: confirm storeapi is NOT defined/aliased in storage package context
rg -n 'storeapi' br/pkg/storage/storage.goRepository: pingcap/tidb
Length of output: 38
Use ReaderOption from the current storage package, not storeapi.ReaderOption.
storeapi is not imported in this file, making storeapi.ReaderOption undefined at line 121. The Open method expects *ReaderOption from the same storage package.
💡 Proposed fix
- r, err := store.Open(ctx, fileName, &storeapi.ReaderOption{
+ r, err := store.Open(ctx, fileName, &ReaderOption{
StartOffset: &start,
EndOffset: &end,
})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| r, err := store.Open(ctx, fileName, &storeapi.ReaderOption{ | |
| StartOffset: &start, | |
| EndOffset: &end, | |
| }) | |
| r, err := store.Open(ctx, fileName, &ReaderOption{ | |
| StartOffset: &start, | |
| EndOffset: &end, | |
| }) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@br/pkg/storage/memstore_test.go` around lines 121 - 124, The test is using
store.Open with a wrong type reference: replace the undefined
storeapi.ReaderOption with the local storage package type by passing
&ReaderOption{ StartOffset: &start, EndOffset: &end } (i.e., use ReaderOption
from the same package as Open); update the call at r, err := store.Open(ctx,
fileName, ...) to construct the correct *ReaderOption type so it matches the
Open signature.
| <<<<<<< HEAD | ||
| ======= | ||
| "//pkg/errno", | ||
| "//pkg/lightning/backend/external", | ||
| >>>>>>> 1d9393d0cd7 (mydump: read row group data at once for small parquet files (#66071)) |
There was a problem hiding this comment.
Resolve merge-conflict markers in deps list.
The <<<<<<< / ======= / >>>>>>> blocks make the Bazel file invalid and block build/test execution.
Also applies to: 58-62
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/lightning/mydump/BUILD.bazel` around lines 26 - 30, Remove the Git
merge-conflict markers (<<<<<<<, =======, >>>>>>>) and merge the dependency
entries so the deps list contains the intended entries (e.g. include both
"//pkg/errno" and "//pkg/lightning/backend/external" as normal strings) without
any conflict markers; update the dependency block in BUILD.bazel where the
markers appear and apply the same fix to the other occurrence noted (lines
58-62) so the file is valid for Bazel parsing.
| store, err := objstore.NewLocalStorage(dir) | ||
| require.NoError(t, err) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat pkg/lightning/mydump/parquet_parser_test.go | head -60Repository: pingcap/tidb
Length of output: 1614
🏁 Script executed:
rg -n --type=go 'func\s+NewLocalStorage\s*\('Repository: pingcap/tidb
Length of output: 142
🏁 Script executed:
rg -n --type=go 'import\s*\(' pkg/lightning/mydump/parquet_parser_test.go -A 30Repository: pingcap/tidb
Length of output: 824
🏁 Script executed:
fd -e go | xargs rg -l 'func NewLocalStorage' | head -20Repository: pingcap/tidb
Length of output: 79
Fix undefined reference by using the correct imported storage package.
The test file imports storage from "github.com/pingcap/tidb/br/pkg/storage" but references undefined objstore.NewLocalStorage. The correct call is storage.NewLocalStorage(dir).
💡 Proposed fix
- store, err := objstore.NewLocalStorage(dir)
+ store, err := storage.NewLocalStorage(dir)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| store, err := objstore.NewLocalStorage(dir) | |
| require.NoError(t, err) | |
| store, err := storage.NewLocalStorage(dir) | |
| require.NoError(t, err) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/lightning/mydump/parquet_parser_test.go` around lines 45 - 46, The test
currently calls objstore.NewLocalStorage(dir) which is undefined because the
file imports storage as "github.com/pingcap/tidb/br/pkg/storage"; replace the
incorrect symbol with storage.NewLocalStorage(dir) (or update the import alias
to objstore if you prefer) so the call matches the imported package; update any
other occurrences of objstore.* in parquet_parser_test.go to storage.* to fix
the undefined reference.
| <<<<<<< HEAD | ||
| store, err := storage.NewLocalStorage(dir) | ||
| require.NoError(t, err) | ||
| r, err := store.Open(context.Background(), name, nil) | ||
| require.NoError(t, err) | ||
| reader, err := NewParquetParser(context.Background(), store, r, name, ParquetFileMeta{}) | ||
| require.NoError(t, err) | ||
| defer reader.Close() | ||
| ======= | ||
| reader := newParquetParserForTest(context.Background(), t, dir, name, ParquetFileMeta{}) | ||
| >>>>>>> 1d9393d0cd7 (mydump: read row group data at once for small parquet files (#66071)) |
There was a problem hiding this comment.
Resolve all merge-conflict markers in this test file.
The file still contains unresolved conflict blocks, so it cannot compile.
Also applies to: 156-172, 176-200, 204-240, 325-339, 346-449, 483-488, 518-524, 547-557, 567-573, 642-653
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/lightning/mydump/parquet_parser_test.go` around lines 97 - 107, Remove
all unresolved Git conflict markers in parquet_parser_test.go and reconcile the
two variants into a single consistent implementation; specifically, replace the
conflict block that contains manual
storage.NewLocalStorage/open/NewParquetParser calls with the test helper
newParquetParserForTest(context.Background(), t, dir, name, ParquetFileMeta{})
(and remove the corresponding defer/close duplication) so the test compiles, and
repeat this cleanup for the other listed ranges (156-172, 176-200, 204-240,
325-339, 346-449, 483-488, 518-524, 547-557, 567-573, 642-653), ensuring any
duplicate resource setup (storage.NewLocalStorage, store.Open, NewParquetParser)
is replaced by the helper and remaining code uses the helper’s returned reader
and its Close method.
| <<<<<<< HEAD | ||
| "github.com/pingcap/tidb/br/pkg/storage" | ||
| ======= | ||
| "github.com/pingcap/tidb/pkg/lightning/common" | ||
| >>>>>>> 1d9393d0cd7 (mydump: read row group data at once for small parquet files (#66071)) |
There was a problem hiding this comment.
Unresolved merge conflict in imports causes build failure.
The merge conflict markers are present, causing the static analysis compilation failures. The conflict is between the storage import path on the release-8.5 branch and the common import path from the cherry-picked commit.
-<<<<<<< HEAD
"github.com/pingcap/tidb/br/pkg/storage"
-=======
"github.com/pingcap/tidb/pkg/lightning/common"
->>>>>>> 1d9393d0cd7 (mydump: read row group data at once for small parquet files (`#66071`))Both imports may be needed based on the code usage—resolve by keeping both imports and removing the conflict markers.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| <<<<<<< HEAD | |
| "github.com/pingcap/tidb/br/pkg/storage" | |
| ======= | |
| "github.com/pingcap/tidb/pkg/lightning/common" | |
| >>>>>>> 1d9393d0cd7 (mydump: read row group data at once for small parquet files (#66071)) | |
| "github.com/pingcap/tidb/br/pkg/storage" | |
| "github.com/pingcap/tidb/pkg/lightning/common" |
🧰 Tools
🪛 GitHub Check: Compile for FreeBSD job
[failure] 29-29:
missing import path
🪛 GitHub Check: Compile for ubuntu-latest
[failure] 29-29:
missing import path
🪛 GitHub Check: integration-test (5.7.35)
[failure] 29-29:
missing import path
🪛 GitHub Check: integration-test (8.0.22)
[failure] 29-29:
missing import path
🪛 GitHub Check: integration-test (8.0.26)
[failure] 29-29:
missing import path
🪛 GitHub Check: integration-test (8.0.37)
[failure] 29-29:
missing import path
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/lightning/mydump/parquet_parser.go` around lines 29 - 33, Resolve the
merge conflict markers in parquet_parser.go imports: remove the <<<<<<<,
=======, and >>>>>>> lines and ensure both import paths are present by adding
both "github.com/pingcap/tidb/br/pkg/storage" and
"github.com/pingcap/tidb/pkg/lightning/common" in the import block; then run a
build to confirm there are no unused-import errors and adjust usages or add
aliasing if necessary to satisfy references in functions/methods within
parquet_parser.go (e.g., any calls referencing storage or common).
| <<<<<<< HEAD | ||
| storage.ReadSeekCloser | ||
| lastOff int64 | ||
| skipBuf []byte | ||
|
|
||
| // current file path and store, used to open file | ||
| store storage.ExternalStorage | ||
| path string | ||
| ======= | ||
| readers []*file.Reader | ||
| iterators []iterator | ||
| >>>>>>> 1d9393d0cd7 (mydump: read row group data at once for small parquet files (#66071)) |
There was a problem hiding this comment.
Unresolved merge conflict in struct definition.
The rowGroupParser struct definition contains conflicting fields between the HEAD branch (which uses storage wrapper fields) and the cherry-picked commit (which uses per-column readers and iterators).
Resolve this conflict by determining the correct field set for the new architecture. Based on the PR objectives, the cherry-picked version with readers and iterators fields appears to be the intended implementation.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/lightning/mydump/parquet_parser.go` around lines 227 - 238, The struct
rowGroupParser has an unresolved merge conflict: remove the HEAD fields
(storage.ReadSeekCloser, lastOff, skipBuf, store, path) and adopt the
cherry-picked fields for the new design (readers []*file.Reader and iterators
[]iterator); update the rowGroupParser definition to only include readers and
iterators and ensure any uses of the removed fields are refactored to the new
per-column reader/iterator approach (search for rowGroupParser, readers,
iterators, storage.ReadSeekCloser, lastOff, skipBuf, store, and path to update
call sites).
| func (pp *ParquetParser) buildRowGroupParser() (err error) { | ||
| eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(pp.ctx) | ||
| eg.SetLimit(8) | ||
|
|
||
| builder, err := pp.getBuilder(egCtx) | ||
| if err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
|
|
||
| readers := make([]*file.Reader, pp.fileMeta.NumColumns()) | ||
| defer func() { | ||
| if err != nil { | ||
| for _, r := range readers { | ||
| if r != nil { | ||
| _ = r.Close() | ||
| } | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| for i := range pp.fileMeta.NumColumns() { | ||
| eg.Go(func() error { | ||
| wrapper, err := builder(i) | ||
| if err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
|
|
||
| reader, err := file.NewParquetReader( | ||
| wrapper, | ||
| file.WithReadProps(pp.prop), | ||
| file.WithMetadata(pp.fileMeta), | ||
| ) | ||
| if err != nil { | ||
| _ = wrapper.Close() | ||
| return errors.Trace(err) | ||
| } | ||
| readers[i] = reader | ||
| return nil | ||
| }) | ||
| } | ||
|
|
||
| if err := eg.Wait(); err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
|
|
||
| rgp := &rowGroupParser{ | ||
| rowGroup: pp.curRowGroup, | ||
| readers: readers, | ||
| } | ||
| if err := rgp.init(pp.colTypes, pp.loc); err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
| pp.rowGroup = rgp | ||
| return nil | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify SetLimit patterns in the codebase
echo "=== Searching for SetLimit(8) occurrences ==="
rg -n "SetLimit\(8\)" --type=go
echo ""
echo "=== Searching for all SetLimit patterns with context ==="
rg -n "SetLimit\([0-9]+\)" --type=go -B2 -A2Repository: pingcap/tidb
Length of output: 1358
🏁 Script executed:
#!/bin/bash
# Check the file for license header and overall structure
head -20 pkg/lightning/mydump/parquet_parser.goRepository: pingcap/tidb
Length of output: 699
🏁 Script executed:
#!/bin/bash
# Search for similar error group patterns and context usage
rg -n "eg, egCtx := util.NewErrorGroupWithRecoverWithCtx" --type=go -A10Repository: pingcap/tidb
Length of output: 6537
🏁 Script executed:
#!/bin/bash
# Check if egCtx.Done() or context cancellation is used in similar goroutines
rg -n "select.*egCtx.Done\(\)" --type=go
rg -n "egCtx.Done()" --type=go -B3 -A3Repository: pingcap/tidb
Length of output: 4745
Add context cancellation check to goroutines and ensure loop variable capture safety.
The goroutines in the error group do not check egCtx.Done() for cancellation, unlike the pattern used elsewhere in the codebase (e.g., parquet_wrapper.go, schema_import.go). Add a cancellation check early in each goroutine to respect context cancellation.
The loop variable i is captured in the goroutine closure at line 390+. Depending on the Go version, this may cause all goroutines to use the final value of i. Either copy i to a local variable before calling eg.Go() or document the minimum Go version requirement.
The hardcoded SetLimit(8) appears in both this function and parquet_wrapper.go. Consider documenting why 8 is the chosen concurrency limit.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/lightning/mydump/parquet_parser.go` around lines 372 - 426, In
buildRowGroupParser, the goroutines started via eg.Go capture the loop variable
i and do not respect cancellation on egCtx; fix by copying the loop index into a
local var (e.g., idx := i) before calling eg.Go and, at the top of each
goroutine passed to eg.Go, check egCtx.Done() (return early with ctx.Err() if
canceled) to mirror the cancellation pattern used in parquet_wrapper.go and
schema_import.go; also add a short comment near eg.SetLimit(8)
explaining/justifying why the concurrency limit is 8 (or replace with a named
constant) so the rationale is documented.
| "github.com/pingcap/tidb/pkg/objstore" | ||
| "github.com/pingcap/tidb/pkg/objstore/storeapi" | ||
| "github.com/pingcap/tidb/pkg/util" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Check if the file exists
ls -la pkg/lightning/mydump/parquet_wrapper.go 2>/dev/null || echo "File not found"
# List contents of pkg/objstore directory
fd -t d 'objstore' pkg --max-depth 3
# Search for the storeapi package definition
rg -l 'package storeapi' --type=go
# Check for ReadDataInRange function
rg -n 'func ReadDataInRange' --type=go
# Check if br/pkg/storage exists
ls -la pkg/br/pkg/storage 2>/dev/null || echo "Directory not found"
# Alternative check: look for storage packages
fd -t d 'storage' pkg --max-depth 2Repository: pingcap/tidb
Length of output: 216
🏁 Script executed:
# Check the actual imports in the parquet_wrapper.go file if it exists
if [ -f pkg/lightning/mydump/parquet_wrapper.go ]; then
echo "=== File contents (first 50 lines) ==="
head -50 pkg/lightning/mydump/parquet_wrapper.go
fiRepository: pingcap/tidb
Length of output: 1783
🏁 Script executed:
# Search for objstore imports in the codebase
rg -n '"github.com/pingcap/tidb/pkg/objstore"' --type=go | head -20
# Search for br/pkg/storage imports
rg -n '"github.com/pingcap/tidb/br/pkg/storage"' --type=go | head -20Repository: pingcap/tidb
Length of output: 1741
🏁 Script executed:
# Check if pkg/objstore exists as a directory
ls -la pkg/objstore 2>&1 | head -20
# Search for package objstore definition
rg -n 'package objstore' --type=go
# Search for storeapi package definition
rg -n 'package storeapi' --type=go
# Look at what functions/types parquet_wrapper uses from objstore
rg -n 'objstore\.' pkg/lightning/mydump/parquet_wrapper.go
rg -n 'storeapi\.' pkg/lightning/mydump/parquet_wrapper.goRepository: pingcap/tidb
Length of output: 341
🏁 Script executed:
# Check the full import section of parquet_wrapper.go to see what's being imported
sed -n '17,31p' pkg/lightning/mydump/parquet_wrapper.go
# Check for function signatures that use storeapi types
rg -A 5 'storeapi\.' pkg/lightning/mydump/parquet_wrapper.go
rg -A 5 'objstore\.' pkg/lightning/mydump/parquet_wrapper.goRepository: pingcap/tidb
Length of output: 1148
🏁 Script executed:
# Verify the actual types and functions in br/pkg/storage
grep -n 'type Storage ' br/pkg/storage/storage.go | head -5
grep -n 'type ReaderOption ' br/pkg/storage/storage.go | head -5
grep -n 'type ReadSeekCloser ' br/pkg/storage/storage.go | head -5
grep -n 'func ReadDataInRange' br/pkg/storage/storage.go -A 3
# Check the Open method signature in br/pkg/storage
grep -n 'func.*Open' br/pkg/storage/storage.go | head -10Repository: pingcap/tidb
Length of output: 221
🏁 Script executed:
# Get the method signature of the Open method on ExternalStorage
rg -B 2 -A 8 'func.*\(e.*ExternalStorage\).*Open\(' br/pkg/storage/storage.goRepository: pingcap/tidb
Length of output: 38
🏁 Script executed:
# Check if there's any reference to a storeapi package anywhere
rg -l 'package.*storeapi' --type=go
# Check all files that might define objstore or storeapi
fd -t f 'storage.go' | xargs grep -l 'type Storage '
# Check if ReadSeekCloser is defined elsewhere
rg 'type ReadSeekCloser' --type=go
# Check what types are defined in br/pkg/storage
rg -n '^type ' br/pkg/storage/storage.go | head -20Repository: pingcap/tidb
Length of output: 662
🏁 Script executed:
# Get full context of the Open method signature in br/pkg/storage
cat -n br/pkg/storage/storage.go | grep -A 5 'Open.*context.Context'Repository: pingcap/tidb
Length of output: 408
🏁 Script executed:
# Final verification: confirm pkg/objstore/storeapi does not exist anywhere
find . -type d -name 'objstore' 2>/dev/null
find . -type d -name 'storeapi' 2>/dev/null
# Check if there are any files defining types from storeapi
rg 'type.*Storage.*interface' --type=go br/pkg/storage/storage.go | head -5
# Verify ExternalFileReader return type from Open method
sed -n '130,145p' br/pkg/storage/storage.goRepository: pingcap/tidb
Length of output: 1016
🏁 Script executed:
# Check what ExternalFileReader interface looks like
sed -n '166,180p' br/pkg/storage/storage.goRepository: pingcap/tidb
Length of output: 569
pkg/objstore and pkg/objstore/storeapi packages do not exist—imports will fail to compile.
The file imports from non-existent packages on lines 27–28. Replace these with br/pkg/storage, which provides the equivalent types (ExternalStorage, ReaderOption) and ReadDataInRange function. All other Lightning/dumpling files in the codebase use br/pkg/storage, not pkg/objstore.
Migration reference
- "github.com/pingcap/tidb/pkg/objstore"
- "github.com/pingcap/tidb/pkg/objstore/storeapi"
+ "github.com/pingcap/tidb/br/pkg/storage"
...
- storeapi.ReadSeekCloser
+ storage.ReadSeekCloser
...
- store storeapi.Storage,
+ store storage.ExternalStorage,
...
- opts *storeapi.ReaderOption,
+ opts *storage.ReaderOption,
...
- _, err := objstore.ReadDataInRange(
+ _, err := storage.ReadDataInRange(Also applies to: 53–54, 107–110, 151–153, 183–184, 196–199
🧰 Tools
🪛 GitHub Actions: Dumpling
[error] 27-27: Build failed: no required module provides package github.com/pingcap/tidb/pkg/objstore. Command: CGO_ENABLED=1 GO111MODULE=on go build -tags codes -ldflags '-X "github.com/pingcap/tidb/pkg/parser/mysql.TiDBReleaseVersion=fb36d00-dirty" -X "github.com/pingcap/tidb/pkg/util/versioninfo.TiDBBuildTS=" -X "github.com/pingcap/tidb/pkg/util/versioninfo.TiDBGitHash=fb36d00deaad5426fb0533e9385e6ae3a17a573c" -X "github.com/pingcap/tidb/pkg/util/versioninfo.TiDBGitBranch=HEAD" -X "github.com/pingcap/tidb/pkg/util/versioninfo.TiDBEdition=Community" ' -o bin/tidb-server ./cmd/tidb-server
🪛 GitHub Check: Compile for FreeBSD job
[failure] 28-28:
no required module provides package github.com/pingcap/tidb/pkg/objstore/storeapi; to add it:
[failure] 27-27:
no required module provides package github.com/pingcap/tidb/pkg/objstore; to add it:
🪛 GitHub Check: Compile for ubuntu-latest
[failure] 28-28:
no required module provides package github.com/pingcap/tidb/pkg/objstore/storeapi; to add it:
[failure] 27-27:
no required module provides package github.com/pingcap/tidb/pkg/objstore; to add it:
🪛 GitHub Check: integration-test (5.7.35)
[failure] 28-28:
no required module provides package github.com/pingcap/tidb/pkg/objstore/storeapi; to add it:
[failure] 27-27:
no required module provides package github.com/pingcap/tidb/pkg/objstore; to add it:
🪛 GitHub Check: integration-test (8.0.22)
[failure] 28-28:
no required module provides package github.com/pingcap/tidb/pkg/objstore/storeapi; to add it:
[failure] 27-27:
no required module provides package github.com/pingcap/tidb/pkg/objstore; to add it:
🪛 GitHub Check: integration-test (8.0.26)
[failure] 28-28:
no required module provides package github.com/pingcap/tidb/pkg/objstore/storeapi; to add it:
[failure] 27-27:
no required module provides package github.com/pingcap/tidb/pkg/objstore; to add it:
🪛 GitHub Check: integration-test (8.0.37)
[failure] 28-28:
no required module provides package github.com/pingcap/tidb/pkg/objstore/storeapi; to add it:
[failure] 27-27:
no required module provides package github.com/pingcap/tidb/pkg/objstore; to add it:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/lightning/mydump/parquet_wrapper.go` around lines 27 - 29, The imports of
non-existent packages "github.com/pingcap/tidb/pkg/objstore" and
"github.com/pingcap/tidb/pkg/objstore/storeapi" in parquet_wrapper.go should be
replaced with "br/pkg/storage"; update any references that rely on
ExternalStorage, ReaderOption, and ReadDataInRange to use the equivalents from
br/pkg/storage (they have the same names), and adjust import blocks wherever
those two old imports appear (other occurrences around the file: the other
import groups referenced in the review). Specifically, replace the two imports
at the top and update any code calling ReadDataInRange, using ExternalStorage or
ReaderOption to reference br/pkg/storage so the file compiles.
| func writeParquetColumnBatch(cw file.ColumnChunkWriter, vals any, defLevels []int16) error { | ||
| var err error | ||
| switch w := cw.(type) { | ||
| case *file.Int96ColumnChunkWriter: | ||
| buf, _ := vals.([]parquet.Int96) | ||
| _, err = w.WriteBatch(buf, defLevels, nil) | ||
| case *file.Int64ColumnChunkWriter: | ||
| buf, _ := vals.([]int64) | ||
| _, err = w.WriteBatch(buf, defLevels, nil) | ||
| case *file.Float64ColumnChunkWriter: | ||
| buf, _ := vals.([]float64) | ||
| _, err = w.WriteBatch(buf, defLevels, nil) | ||
| case *file.ByteArrayColumnChunkWriter: | ||
| buf, _ := vals.([]parquet.ByteArray) | ||
| _, err = w.WriteBatch(buf, defLevels, nil) | ||
| case *file.FixedLenByteArrayColumnChunkWriter: | ||
| buf, _ := vals.([]parquet.FixedLenByteArray) | ||
| _, err = w.WriteBatch(buf, defLevels, nil) | ||
| case *file.Int32ColumnChunkWriter: | ||
| buf, _ := vals.([]int32) | ||
| _, err = w.WriteBatch(buf, defLevels, nil) | ||
| case *file.BooleanColumnChunkWriter: | ||
| buf, _ := vals.([]bool) | ||
| _, err = w.WriteBatch(buf, defLevels, nil) | ||
| default: | ||
| return fmt.Errorf("unsupported column type %T", cw) | ||
| } | ||
| return err | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Check if the file exists and show the license header
head -20 pkg/lightning/mydump/parquet_writer.goRepository: pingcap/tidb
Length of output: 690
🏁 Script executed:
# Show lines 56-84 to confirm the code snippet
sed -n '56,84p' pkg/lightning/mydump/parquet_writer.goRepository: pingcap/tidb
Length of output: 1086
🏁 Script executed:
# Check the full function context and who calls writeParquetColumnBatch
rg -B5 -A30 'func writeParquetColumnBatch' pkg/lightning/mydump/parquet_writer.goRepository: pingcap/tidb
Length of output: 1221
🏁 Script executed:
# Find where this function is called to understand how vals is constructed
rg -n 'writeParquetColumnBatch' pkg/lightning/mydump/Repository: pingcap/tidb
Length of output: 311
🏁 Script executed:
# Show context around the call site at line 226
sed -n '200,240p' pkg/lightning/mydump/parquet_writer.goRepository: pingcap/tidb
Length of output: 973
🏁 Script executed:
# Look for where rowVals is constructed and how the type is guaranteed
rg -B20 'writeParquetColumnBatch.*rowVals' pkg/lightning/mydump/parquet_writer.go | head -100Repository: pingcap/tidb
Length of output: 595
🏁 Script executed:
# Check the sliceColumnData function signature and return type
sed -n '86,150p' pkg/lightning/mydump/parquet_writer.goRepository: pingcap/tidb
Length of output: 1882
🏁 Script executed:
# Check how cw (ColumnChunkWriter) is obtained - from rgw.NextColumn()
rg -B5 -A5 'rgw.NextColumn' pkg/lightning/mydump/parquet_writer.goRepository: pingcap/tidb
Length of output: 380
🏁 Script executed:
# Check if there's a relationship between column type and the writer type assigned
rg -B10 'for colIdx := range pcolumns' pkg/lightning/mydump/parquet_writer.go | head -30Repository: pingcap/tidb
Length of output: 323
🏁 Script executed:
# Verify the test-only usage by checking ParquetColumn documentation
sed -n '109,125p' pkg/lightning/mydump/parquet_writer.goRepository: pingcap/tidb
Length of output: 544
🏁 Script executed:
# Check if there's any non-test usage of these functions
rg -l 'parquet_writer\.go' pkg/lightning/ | xargs grep -l 'ParquetColumn\|writeParquetColumnBatch'Repository: pingcap/tidb
Length of output: 40
🏁 Script executed:
# Search for test files that use ParquetColumn
rg 'ParquetColumn' pkg/lightning/ --type goRepository: pingcap/tidb
Length of output: 1253
🏁 Script executed:
# Check if writeParquetColumnBatch or sliceColumnData are used anywhere outside the file
rg 'writeParquetColumnBatch|sliceColumnData' --type goRepository: pingcap/tidb
Length of output: 556
Add explicit type assertion checks in writeParquetColumnBatch.
Type assertions without checking the ok value can silently pass zero/nil buffers to WriteBatch if the type doesn't match, potentially corrupting the parquet file. Each case should validate the assertion succeeds before proceeding.
Suggested fix pattern
case *file.Int64ColumnChunkWriter:
- buf, _ := vals.([]int64)
+ buf, ok := vals.([]int64)
+ if !ok {
+ return fmt.Errorf("expected []int64, got %T", vals)
+ }
_, err = w.WriteBatch(buf, defLevels, nil)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func writeParquetColumnBatch(cw file.ColumnChunkWriter, vals any, defLevels []int16) error { | |
| var err error | |
| switch w := cw.(type) { | |
| case *file.Int96ColumnChunkWriter: | |
| buf, _ := vals.([]parquet.Int96) | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.Int64ColumnChunkWriter: | |
| buf, _ := vals.([]int64) | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.Float64ColumnChunkWriter: | |
| buf, _ := vals.([]float64) | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.ByteArrayColumnChunkWriter: | |
| buf, _ := vals.([]parquet.ByteArray) | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.FixedLenByteArrayColumnChunkWriter: | |
| buf, _ := vals.([]parquet.FixedLenByteArray) | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.Int32ColumnChunkWriter: | |
| buf, _ := vals.([]int32) | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.BooleanColumnChunkWriter: | |
| buf, _ := vals.([]bool) | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| default: | |
| return fmt.Errorf("unsupported column type %T", cw) | |
| } | |
| return err | |
| } | |
| func writeParquetColumnBatch(cw file.ColumnChunkWriter, vals any, defLevels []int16) error { | |
| var err error | |
| switch w := cw.(type) { | |
| case *file.Int96ColumnChunkWriter: | |
| buf, ok := vals.([]parquet.Int96) | |
| if !ok { | |
| return fmt.Errorf("expected []parquet.Int96, got %T", vals) | |
| } | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.Int64ColumnChunkWriter: | |
| buf, ok := vals.([]int64) | |
| if !ok { | |
| return fmt.Errorf("expected []int64, got %T", vals) | |
| } | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.Float64ColumnChunkWriter: | |
| buf, ok := vals.([]float64) | |
| if !ok { | |
| return fmt.Errorf("expected []float64, got %T", vals) | |
| } | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.ByteArrayColumnChunkWriter: | |
| buf, ok := vals.([]parquet.ByteArray) | |
| if !ok { | |
| return fmt.Errorf("expected []parquet.ByteArray, got %T", vals) | |
| } | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.FixedLenByteArrayColumnChunkWriter: | |
| buf, ok := vals.([]parquet.FixedLenByteArray) | |
| if !ok { | |
| return fmt.Errorf("expected []parquet.FixedLenByteArray, got %T", vals) | |
| } | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.Int32ColumnChunkWriter: | |
| buf, ok := vals.([]int32) | |
| if !ok { | |
| return fmt.Errorf("expected []int32, got %T", vals) | |
| } | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| case *file.BooleanColumnChunkWriter: | |
| buf, ok := vals.([]bool) | |
| if !ok { | |
| return fmt.Errorf("expected []bool, got %T", vals) | |
| } | |
| _, err = w.WriteBatch(buf, defLevels, nil) | |
| default: | |
| return fmt.Errorf("unsupported column type %T", cw) | |
| } | |
| return err | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/lightning/mydump/parquet_writer.go` around lines 56 - 84, The function
writeParquetColumnBatch uses unchecked type assertions (e.g., buf, _ :=
vals.([]int64)) which can yield nil/zero buffers and corrupt output; update each
case (for types like *file.Int96ColumnChunkWriter, *file.Int64ColumnChunkWriter,
*file.Float64ColumnChunkWriter, *file.ByteArrayColumnChunkWriter,
*file.FixedLenByteArrayColumnChunkWriter, *file.Int32ColumnChunkWriter,
*file.BooleanColumnChunkWriter) to perform the comma-ok assertion (buf, ok :=
vals.([]T)) and if ok is false return a clear error (e.g., fmt.Errorf("expected
[]T for %T but got %T", bufType, cw, vals)) instead of calling WriteBatch with a
nil buffer, otherwise call WriteBatch(buf, defLevels, nil) and propagate its
error.
This is an automated cherry-pick of #66071
What problem does this PR solve?
Issue Number: close #66180
Problem Summary:
Found by the case of customer.
As you can see, this parquet file contains a large number of extremely small columns. Currently, we create a separate reader for each column sequentially. When there are many such columns, the “opening” operation itself might become a bottleneck due to first-byte latency.
What changed and how does it work?
Previously we read the whole data into memory if the file size <= 256 MiB. But I removd this in #63979 (it will hurt lightning performance & introduce extra memory usage). Now we add it back, with a smaller threshold and parallel read.
Check List
Tests
Test with the redacted data provided by the customer. Since the bottleneck of this case is encoding, so we just check the read duration of single chunk. As you can see, the read duration is reduced from 1m46s to 54s.
Besides, it also affect the time needed to submit the job:
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests